#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
from twisted.internet import task ,reactor
import json
import config.glob_conf
import requests
from requests.auth import HTTPBasicAuth
import datetime
from scpy.logger import get_logger
from pymongo import MongoClient
from kafka import SimpleConsumer, KafkaClient, SimpleProducer, KafkaConsumer


from scpy.rabbit_mq import get_queue_size
import pika
from scpy.rabbit_asy_producer import AsyPublisher
import time
# from scpy.logger import get_json_logger
from scpy.rabbit_mq import get_queue_size

logger = get_logger(__file__)


reload(sys)
sys.setdefaultencoding(u'utf-8')


#mongo
conn = MongoClient(config.glob_conf.MONGO_HOST,27017)
db = conn[config.glob_conf.NAME_DB]
collection = db[config.glob_conf.NAME_COLL]


db_log = conn['crawler_log']
collection_log = db_log['saic_ids_log']

collection_data = db_log['saic_company_name']



QUEUE_DICT = {
    'cq':'/queue/saic_cq',
    'tj':'/queue/saic_tj',
    'sc':'/queue/saic_sc',
    'gd':'/queue/saic_gd',
    'hn':'/queue/saic_hn',
    'js':'/queue/saic_js',
    'nmg':'/queue/saic_nmg',
    'yn':'/queue/saic_yn',
    'hlj':'/queue/saic_hlj',
    'hen':'/queue/saic_hen',
    'ah':'/queue/saic_ah',
    'gx':'/queue/saic_gx',
    'bj':'/queue/saic_bj',
    'ln':'/queue/saic_ln',
    'xj':'/queue/saic_xj',
    'jx':'/queue/saic_jx',
    'sd':'/queue/saic_sd',
    'fj':'/queue/saic_fj',
    'gs':'/queue/saic_gs',
    'jl':'/queue/saic_jl',
    'hb':'/queue/saic_hb',
    'sx':'/queue/saic_sx',
    'qh':'/queue/saic_qh',
    'xz':'/queue/saic_xz',
    'sax':'/queue/saic_sax',
    'heb':'/queue/saic_heb',
    'nx':'/queue/saic_nx',
    'zj':'/queue/saic_zj',
    'sh':'/queue/saic_app_sh',
    }

topic_config = {
    'bj': 'bj_saic_2',
    'cq': 'saic_group_round_2',
    'jl': 'saic_group_round_2',
    'js': 'saic_group_round_2',
    'sd': 'saic_group_round_2',
    'hlj': 'saic_group_round_2',
    'gx': 'saic_group_round_2',
    'tj': 'saic_group_round_2',
    'fj': 'saic_group_round_2',
    'gd': 'saic_group_round_2',
    'zj': 'saic_group_round_2',
}
#kafka

kafka = KafkaClient(config.glob_conf.KAFAKA_HOST)


def kafka_producer(province):
    """
    组织机构代码放到queue里面
    """
    index = 0
    producer = SimpleProducer(kafka,async=True,batch_send_every_n=100,batch_send_every_t=60)
    for item in collection.find({'province':province}).batch_size(30):
        company_name = item.get('companyName')
        company_name = company_name.replace("(","（").replace(")","）")
        province = item.get('province')
        message = {'key':company_name,'province':province}
        producer.send_messages(QUEUE_DICT.get(province).split('/')[-1],json.dumps(message))
        index+=1
        if index%10000==0:
            logger.info('add message in kafka: %d'%index)

def kafka_consumer(province):
    """
    从kafka 拿名字放到queue
    """
    logger.info('start kafka consumer')

    consumer = KafkaConsumer(QUEUE_DICT.get(province).split('/')[-1],
                             bootstrap_servers=[config.glob_conf.KAFAKA_HOST],
                             group_id=topic_config.get(province),
                             auto_commit_enable=True,
                             auto_commit_interval_ms=1 * 1000,
                             fetch_message_max_bytes=1024*10240,
                             fetch_min_bytes=1024*1024,
                             fetch_wait_max_ms=100,
                             auto_offset_reset='smallest'
                             )

    for message in consumer:
        json_mes = json.loads(message.value)
        company_name = json_mes.get('key', '')
        # check_data = collection_data.find_one({'companyName': company_name})
        # if check_data:
        #     logger.info('in db %s' % company_name)
        #     continue

        yield company_name
        consumer.task_done(message)
        consumer.commit()


class MongoProducer(AsyPublisher):
    def __init__(self, amqp_url, queue_name, province, count, host, user, password):
        AsyPublisher.__init__(self, amqp_url=amqp_url, queue_name=queue_name)
        self.province = province
        self.count = count
        self.QUEUE = QUEUE_DICT.get(self.province).split('/')[-1]
        self.host = host
        self.user = user
        self.password = password

    def set_params(self, province, count, host, user, password):
        self.province = province
        self.count = count
        self.QUEUE = QUEUE_DICT.get(self.province).split('/')[-1]
        self.host = host
        self.user = user
        self.password = password
        print self.province
        print self.count

    def publish_message(self):
        temp_count = 0
        queue_size = get_queue_size(host=self.host, queue_name=self.QUEUE, user=self.user, password=self.password)
        if queue_size >= self.count:
            time.sleep(60*5)
        else:
            time.sleep(1)
            for company_name in kafka_consumer(self.province):

                message = {'key': company_name}
                properties = pika.BasicProperties(app_id='example-publisher',
                                                  content_type='application/json',
                                                  headers=message)
                self._channel.basic_publish(
                    '',
                    self.QUEUE,
                    json.dumps(message),
                    properties)
                temp_count += 1
                if temp_count % 100 == 0:
                    logger.info(temp_count)

                if self.count - queue_size <= temp_count:
                    break

        self.reconnect()

if __name__ == "__main__":
    import sys
    # rb_province = "cq"
    rb_province = sys.argv[1]
    t_t = sys.argv[2]
    # t_t = "rb"
    if t_t == 'kafka':
        kafka_producer(rb_province)
    elif t_t == 'rb':
        rb_amqp_url = config.glob_conf.amqp_url
        rb_queue_name = QUEUE_DICT.get(rb_province).split('/')[-1]
        rb_host = config.glob_conf.rb_host
        rb_count = config.glob_conf.QUEUE_MAX_SIZE
        # rb_count = 10

        rb_user = config.glob_conf.rb_user
        rb_password = config.glob_conf.rb_password

        # producer = MongoProducer(amqp_url, queue_name)
        producer = MongoProducer(amqp_url=rb_amqp_url, queue_name=rb_queue_name, province=rb_province, count=rb_count,
                                 host=rb_host, user=rb_user, password=rb_password)
        # producer.set_params(province=province, count=count, host=host, user=user, password=password)

        try:
            producer.run()
        except KeyboardInterrupt:
            producer.stop()
